summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/otultraeasy.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/otultraeasy.py')
-rw-r--r--glucometerutils/drivers/otultraeasy.py54
1 files changed, 34 insertions, 20 deletions
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 41e2acd..d5eb3c9 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -18,6 +18,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device.
import binascii
import datetime
import logging
+from typing import Any, Dict, Generator, Optional
import construct
@@ -92,7 +93,13 @@ _READING_RESPONSE = construct.Struct(
)
-def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect):
+def _make_packet(
+ message: bytes,
+ sequence_number: int,
+ expect_receive: bool,
+ acknowledge: bool,
+ disconnect: bool,
+):
return _PACKET.build(
{
"data": {
@@ -115,24 +122,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
- def __init__(self, device):
- super(Device, self).__init__(device)
+ def __init__(self, device: Optional[str]) -> None:
+ super().__init__(device)
self.sent_counter_ = False
self.expect_receive_ = False
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def connect(self):
+ def connect(self) -> None:
try:
self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def disconnect(self):
+ def disconnect(self) -> None:
self.connect()
- def _send_packet(self, message, acknowledge=False, disconnect=False):
+ def _send_packet(
+ self, message: bytes, acknowledge: bool = False, disconnect: bool = False
+ ) -> None:
pkt = _make_packet(
message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
)
@@ -141,7 +150,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.serial_.write(pkt)
self.serial_.flush()
- def _read_packet(self):
+ def _read_packet(self) -> construct.Container:
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
logging.debug("received packet: %r", raw_pkt)
@@ -157,14 +166,19 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return pkt
- def _send_ack(self):
+ def _send_ack(self) -> None:
self._send_packet(b"", acknowledge=True, disconnect=False)
- def _read_ack(self):
+ def _read_ack(self) -> None:
pkt = self._read_packet()
assert pkt.link_control.acknowledge
- def _send_request(self, request_format, request_obj, response_format):
+ def _send_request(
+ self,
+ request_format: construct.Struct,
+ request_obj: Optional[Dict[str, Any]],
+ response_format: construct.Struct,
+ ) -> construct.Container:
try:
request = request_format.build(request_obj)
self._send_packet(request, acknowledge=False, disconnect=False)
@@ -182,7 +196,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
"OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
@@ -190,26 +204,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
response = self._send_request(
_SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
)
return response.serial_number
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
)
return response.timestamp
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST,
{"request_type": "write", "timestamp": date},
@@ -217,17 +231,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
)
return response.timestamp
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
_GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
)
return response.unit
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
response = self._send_request(
_READ_RECORD_REQUEST,
{"record_id": _INVALID_RECORD},
@@ -235,14 +249,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
)
return response.count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> common.GlucoseReading:
response = self._send_request(
_READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
)
return common.GlucoseReading(response.timestamp, float(response.value))
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)